---
layout: post
date: 2024-10-27
title: "TCP Server in Zig - Part 7 - Kqueue"
description: "Using BSD's kqueue to improve our async evented TCP server."
tags: [zig]
---

<p><code>kqueue</code> is a BSD/MacOS alternative over <code>poll</code>. In most ways, <code>kqueue</code> is similar to the Linux-specific <code>epoll</code>, which itself is important, but important, incremental upgrade to <code>poll</code>. Because <code>kqueue</code> has a single function it superficially looks like <code>poll</code>. But, as we'll soon see, that single function can behave in two different ways, making its API and the integration into our code very similar to <code>epoll</code>.</p>

<p>Because <code>kqueue</code> is rather similar to <code>epoll</code>, this part is shorter as it assumes that you're familiar with topics discussed in <a href=/TCP-Server-In-Zig-Part-6-Epoll>part 6</a>, such as edge-triggering and <code>@intToPtr</code>.</p>

<h3 id=kevent><a href="#kevent" aria-hidden=true>kevent</a></h3>
<p>Where <code>epoll</code> has one function to modify the epoll file descriptor (<code>epoll_ctl</code>) and one to wait for notifications (<code>epoll_wait</code>), <code>kqueue</code> uses a single function for both purposes: <code>kevent</code>. However, depending on the values passed to <code>kevent</code>, it can either modify the kqueue instance or wait for notifications or both. Thus, the single function can act like either of the <code>epoll</code> functions or combine both in a single call. The <code>kevent</code> function takes 4 parameters:</p>

<ol>
    <li>The kqueue file descriptor which is the kqueue instance that we're modifying and/or waiting on. Created using <code>posix.kqueue</code>.
    <li>A list of <code>posix.Kevent</code> that represents notifications we want to add/change/delete. Known as the <code>changelist</code>. Can be empty.
    <li>A list of <code>posix.Kevent</code> that indicate readiness. Known as the <code>eventlist</code>. Can be empty.
    <li>A timeout as a <code>posix.timespec</code>. Can be null.
</ol>

<p>The key to understanding this API is knowing that when the <code>eventlist</code> is empty, <code>kevent</code> immediately returns. Thus, calling<code>kevent</code> with an empty <code>eventlist</code> is like calling <code>epoll_ctl</code>. Therefore, like <code>epoll</code> and unlike <code>poll</code>, as long as we have the kqueue instance, we can easily add, remove and change monitors.</p>

<p>The <code>kqueue</code> API has one advantage over <code>epoll</code>: we can apply modification in bulk. Where <code>epoll_ctl</code> takes a single <code>epoll_event</code>, <code>kevent</code> takes an array of <code>Kevent</code>. In other words, with <code>kqueue</code> it should be possible to make fewer system calls.</p>

<p>This is a working example (on BSD / MacOS). To keep it simple and similar to our first <code>epoll</code> sample, we're not leveraging the bulk-modification capabilities of the API but rather add one event at a time (the final example <em>does</em> add them in bulk): </p>

{% highlight zig %}
const std = @import("std");
const net = std.net;
const posix = std.posix;

pub fn main() !void {
    const address = try std.net.Address.parseIp("127.0.0.1", 5882);

    const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
    const protocol = posix.IPPROTO.TCP;
    const listener = try posix.socket(address.any.family, tpe, protocol);
    defer posix.close(listener);

    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    try posix.bind(listener, &address.any, address.getOsSockLen());
    try posix.listen(listener, 128);

    const kfd = try posix.kqueue();
    defer posix.close(kfd);

    {
        // monitor our listening socket
        _ = try posix.kevent(kfd, &.{.{
            .ident = @intCast(listener),
            .filter = posix.system.EVFILT.READ,
            .flags = posix.system.EV.ADD,
            .fflags = 0,
            .data = 0,
            .udata = @intCast(listener),
        }}, &.{}, null);
    }

    var ready_list: [128]posix.Kevent = undefined;
    while (true) {
        const ready_count = try posix.kevent(kfd, &.{}, &ready_list, null);
        for (ready_list[0..ready_count]) |ready| {
            const ready_socket: i32 = @intCast(ready.udata);
            if (ready_socket == listener) {
                const client_socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
                errdefer posix.close(client_socket);
                _ = try posix.kevent(kfd, &.{.{
                    .ident = @intCast(client_socket),
                    .flags = posix.system.EV.ADD,
                    .filter = posix.system.EVFILT.READ,
                    .fflags = 0,
                    .data = 0,
                    .udata = @intCast(client_socket),
                }}, &.{}, null);
            } else {
                var closed = false;
                var buf: [4096]u8 = undefined;
                const read = posix.read(ready_socket, &buf) catch 0;
                if (read == 0) {
                    closed = true;
                } else {
                    std.debug.print("[{d}] got: {any}\n", .{ready_socket, buf[0..read]});
                }

                if (closed) {
                    posix.close(ready_socket);
                }
            }
        }
    }
}
{% endhighlight %}

<p>Like with <code>epoll</code>, we can attach arbitrary information via the <code>udata</code> field. Above we're using the file descriptor, but in a more complete example we'd likely use <code>@intFromPtr</code> to get a <code>usize</code> representation of an application-specific "Client" struct. The <code>Kevent</code> struct has two additional fields: <code>fflags</code> and <code>data</code>. These hold flags and data to use for different filters. With sockets, where we're only interested in the <code>READ</code> and <code>WRITE</code> filters, these should be set to zero. In a future part, we'll see a brief example of a different filter which does leverage the <code>fflags</code> field.</li>

<p>With <code>epoll</code> the monitors we add, modify are identified by the 3rd parameter we pass to <code>epoll_ctl</code>. In all the code we've seen so far, that was either the listening socket or the client socket, but more generally, it's the file descriptor to monitor. With <code>kqueue</code> the identifier is the combination of the <code>ident</code> and <code>filter</code> fields. With <code>epoll</code> we toggled a client from read-mode to write-mode by modifying the existing notifier (identified by the socket) with the <code>CTL_MOD</code> operation. In <code>kqueue</code> we'd need delete the read monitor and then add a write monitor. Or, and this is what we do in the full example given at the end, we add both a read and write monitor, but disable the write monitor. We can toggle the mode by disabling the active one and enabling the disabled one:</p>

{% highlight text %}
Read Mode:
key=(ident: socket1, filter: read), enabled=true
key=(ident: socket1, filter: write), enabled=false

Write Mode:
key=(ident: socket1, filter: read), enabled=false
key=(ident: socket1, filter: write), enabled=true
{% endhighlight %}

<p>This also means that <code>filter</code> isn't a bitwise flag. To check if the socket is ready for reading, we just have to compare <code>ready.filter == posix.system.EVFILT.READ</code>.</p>

<h3 id=edge_triggered><a href="#edge_triggered" aria-hidden=true>Edge-Triggered</a></h3>
<p>In addition to using the <code>EV.READ</code> and <code>EV.WRITE</code> flags, we can also set <code>EV.ONESHOT</code>, <code>EV.DISPATCH</code> and <code>EV.CLEAR</code>.</p>

<p><code>EV.ONESHOT</code> removes the notification after readiness has been reported, making it one-time-only. The notification has to be re-added using the <code>EV.ADD</code> flag.</p>

<p><code>EV.DISPATCH</code> is similar but rather than removing the notification, it disables it (thus, <code>EV.DISPATCH</code> is like <code>EPOLL.ONESHOT</code>). To re-arm the notification, <code>EV.ENABLE</code> or <code>EV.ADD</code> have to be called ("adding" an already added entry, whether it's disabled or not, does not create a duplicate, and will re-enable it if disabled). The difference between removing (<code>ONESHOT</code>) and disabling (<code>DISPATCH</code>) is that disabling and re-enabling is faster but takes a bit more memory since the internal structure is kept. If you intend to frequently re-arm the notification, <code>EV.DISPATCH</code> might be a better choice.</p>

<p><code>EV.CLEAR</code> is similar to <code>EPOLL.ET</code>, causing <code>kevent</code> to signal state change rather than readiness.</p>

<p>As before, we can take our above code, strip out the "read" handling to see the various behaviors. Only the end of the code was changed</p>

{% highlight zig %}
const std = @import("std");
const net = std.net;
const posix = std.posix;
const linux = std.os.linux;

pub fn main() !void {
    const address = try std.net.Address.parseIp("127.0.0.1", 5882);

    const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
    const protocol = posix.IPPROTO.TCP;
    const listener = try posix.socket(address.any.family, tpe, protocol);
    defer posix.close(listener);

    try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
    try posix.bind(listener, &address.any, address.getOsSockLen());
    try posix.listen(listener, 128);

    const kfd = try posix.kqueue();

    {
        // monitor our listening socket
        _ = try posix.kevent(kfd, &.{.{
            .ident = @intCast(listener),
            .flags = posix.system.EV.ADD,
            .filter = posix.system.EVFILT.READ,
            .fflags = 0,
            .data = 0,
            .udata = @intCast(listener),
        }}, &.{}, null);
    }

    var ready_list: [128]posix.Kevent = undefined;
    while (true) {
        const ready_count = try posix.kevent(kfd, &.{}, &ready_list, null);
        for (ready_list[0..ready_count]) |ready| {
            const ready_socket: i32 = @intCast(ready.udata);
            if (ready_socket == listener) {
                const client_socket = try posix.accept(listener, null, null, posix.SOCK.NONBLOCK);
                errdefer posix.close(client_socket);
                _ = try posix.kevent(kfd, &.{.{
                    .ident = @intCast(client_socket),
                    .flags = posix.system.EV.ADD,
                    .filter = posix.system.EVFILT.READ,
                    .fflags = 0,
                    .data = 0,
                    .udata = @intCast(client_socket),
                }}, &.{}, null);
            } else {
                // THIS WAS CHANGED
                std.debug.print(".", .{});
            }
        }
    }
}
{% endhighlight %}

<p>If you connect to the above and send a message, your screen will get flooded with dots (<code>.</code>) as <code>kevent</code> will continuously notify about the sockets readiness (since we never read from it). Changing the filter for the added client socket from: <code>posix.system.EV.ADD</code> to one of:</p>

<ul>
    <li><code>posix.system.EV.ADD | posix.system.EV.ONESHOT</code>,
    <li><code>posix.system.EV.ADD | posix.system.EV.DISPATCH</code>, or
    <li><code>posix.system.EV.ADD | posix.system.EV.CLEAR</code>.
</ul>

<p>will show how each behaves. For the first two, <code>ONESHOT</code> and <code>DISPATCH</code> no matter how much data we send, we'll only ever get a single notification. We'd need to re-add or re-enable (aka, re-arm) the notification. For <code>CLEAR</code> we'll get a single notification each time new data becomes ready.</p>

<h3 id=conclusion><a href="#conclusion" aria-hidden=true>Conclusion</a></h3>
<p>Although <code>kqueue</code> and <code>epoll</code> are platform-specific, they're quite similar, allowing us to create a simple abstraction to target either platform - the topic of our next part. Furthermore, their similarity has the benefit of resulting in a rather short post!</p>

<p>A more complete example is included below, including our <code>Server</code>, <code>Client</code> and writes. Here you'll see the <code>udata</code> field used to a<code>Client</code> (via <code>@intFromPtr</code> and <code>@ptrFromInt</code>).</p>

<p>This code leverages the bulk-modification capabilities of <code>kevent</code>. When we add or modify a notification, we "stage" these in a local <code>change_list</code>. Only when the <code>change_list</code> is full or <code>KQueue.wait</code> is called do we apply the changes. In the latter case, applying the changes and waiting for readiness is done in a single system call. All of this is a simple but effective way to reduce the number of system calls we must make.</p>

<div class=pager>
  <a class=prev href=/TCP-Server-In-Zig-Part-6-Epoll/>Epoll</a>
  <a class=next href=/TCP-Server-In-Zig-Part-8-Epoll-and-Kqueue/>Epoll & Klueue</a>
</div>

<h3 id=appendix-a><a href="#appendix-a" aria-hidden=true>Appendix A - Code</a></h3>
<aside><p>This example will only run on BSD / MacOS. If you try to run it in docker, you might need to change the listening address from <code>127.0.0.1</code> to <code>0.0.0.0</code>.</p></aside>

{% highlight zig %}
const std = @import("std");
const net = std.net;
const posix = std.posix;
const system = std.posix.system;
const Allocator = std.mem.Allocator;

const log = std.log.scoped(.tcp_demo);

pub fn main() !void {
    var gpa = std.heap.GeneralPurposeAllocator(.{}){};
    const allocator = gpa.allocator();

    var server = try Server.init(allocator, 4096);
    defer server.deinit();

    const address = try std.net.Address.parseIp("0.0.0.0", 5882);
    try server.run(address);
    std.debug.print("STOPPED\n", .{});
}

// 1 minute
const READ_TIMEOUT_MS = 60_000;

const ClientList = std.DoublyLinkedList(*Client);
const ClientNode = ClientList.Node;

const Server = struct {
    // maximum # of allowed clients
    max: usize,

    loop: KQueue,

    // creates our polls and clients slices and is passed to Client.init
    // for it to create our read buffer.
    allocator: Allocator,

    // The number of clients we currently have connected
    connected: usize,

    read_timeout_list: ClientList,

    // for creating client
    client_pool: std.heap.MemoryPool(Client),
    // for creating nodes for our read_timeout list
    client_node_pool: std.heap.MemoryPool(ClientList.Node),

    fn init(allocator: Allocator, max: usize) !Server {
        const loop = try KQueue.init();
        errdefer loop.deinit();

        const clients = try allocator.alloc(*Client, max);
        errdefer allocator.free(clients);

        return .{
            .max = max,
            .loop = loop,
            .connected = 0,
            .allocator = allocator,
            .read_timeout_list = .{},
            .client_pool = std.heap.MemoryPool(Client).init(allocator),
            .client_node_pool = std.heap.MemoryPool(ClientNode).init(allocator),
        };
    }

    fn deinit(self: *Server) void {
        self.loop.deinit();
        self.client_pool.deinit();
        self.client_node_pool.deinit();
    }

    fn run(self: *Server, address: std.net.Address) !void {
        const tpe: u32 = posix.SOCK.STREAM | posix.SOCK.NONBLOCK;
        const protocol = posix.IPPROTO.TCP;
        const listener = try posix.socket(address.any.family, tpe, protocol);
        defer posix.close(listener);

        try posix.setsockopt(listener, posix.SOL.SOCKET, posix.SO.REUSEADDR, &std.mem.toBytes(@as(c_int, 1)));
        try posix.bind(listener, &address.any, address.getOsSockLen());
        try posix.listen(listener, 128);
        var read_timeout_list = &self.read_timeout_list;

        try self.loop.addListener(listener);

        while (true) {
            const next_timeout = self.enforceTimeout();
            const ready_events = try self.loop.wait(next_timeout);
            for (ready_events) |ready| {
                switch (ready.udata) {
                    0 => self.accept(listener) catch |err| log.err("failed to accept: {}", .{err}),
                    else => |nptr| {
                        const filter = ready.filter;
                        const client: *Client = @ptrFromInt(nptr);

                        if (filter == system.EVFILT.READ) {
                            // this socket is ready to be read
                            while (true) {
                                const msg = client.readMessage() catch {
                                    self.closeClient(client);
                                    break;
                                } orelse break;   // no more messages

                                client.read_timeout = std.time.milliTimestamp() + READ_TIMEOUT_MS;
                                read_timeout_list.remove(client.read_timeout_node);
                                read_timeout_list.append(client.read_timeout_node);

                                client.writeMessage(msg) catch {
                                    self.closeClient(client);
                                    break;
                                };
                            }
                        } else if (filter == system.EVFILT.WRITE) {
                            client.write() catch self.closeClient(client);
                        }
                    }
                }
            }
        }
    }

    fn enforceTimeout(self: *Server) i32 {
        const now = std.time.milliTimestamp();
        var node = self.read_timeout_list.first;
        while (node) |n| {
            const client = n.data;
            const diff = client.read_timeout - now;
            if (diff > 0) {
                // this client's timeout is the first one that's in the
                // future, so we now know the maximum time we can block on
                // poll before having to call enforceTimeout again
                return @intCast(diff);
            }

            // This client's timeout is in the past. Close the socket
            // Ideally, we'd call server.removeClient() and just remove the
            // client directly. But within this method, we don't know the
            // client_polls index. When we move to epoll / kqueue, this problem
            // will go away, since we won't need to maintain polls and client_polls
            // in sync by index.
            posix.shutdown(client.socket, .recv) catch {};
            node = n.next;
        } else {
            // We have no client that times out in the future (if we did
            // we would have hit the return above).
            return -1;
        }
    }

    fn accept(self: *Server, listener: posix.socket_t) !void {
        const space = self.max - self.connected;
        for (0..space) |_| {
            var address: net.Address = undefined;
            var address_len: posix.socklen_t = @sizeOf(net.Address);
            const socket = posix.accept(listener, &address.any, &address_len, posix.SOCK.NONBLOCK) catch |err| switch (err) {
                error.WouldBlock => return,
                else => return err,
            };

            const client = try self.client_pool.create();
            errdefer self.client_pool.destroy(client);
            client.* = Client.init(self.allocator, socket, address, &self.loop) catch |err| {
                posix.close(socket);
                log.err("failed to initialize client: {}", .{err});
                return;
            };
            errdefer client.deinit(self.allocator);

            client.read_timeout = std.time.milliTimestamp() + READ_TIMEOUT_MS;
            client.read_timeout_node = try self.client_node_pool.create();
            errdefer self.client_node_pool.destroy(client.read_timeout_node);

            client.read_timeout_node.* = .{
                .next = null,
                .prev = null,
                .data = client,
            };
            self.read_timeout_list.append(client.read_timeout_node);
            try self.loop.newClient(client);
            self.connected += 1;
        } else {
            // we've run out of space, stop monitoring the listening socket
            try self.loop.removeListener(listener);
        }
    }

    fn closeClient(self: *Server, client: *Client) void {
        self.read_timeout_list.remove(client.read_timeout_node);

        posix.close(client.socket);
        self.client_node_pool.destroy(client.read_timeout_node);
        client.deinit(self.allocator);
        self.client_pool.destroy(client);
    }
};

const Client = struct {
    loop: *KQueue,

    socket: posix.socket_t,
    address: std.net.Address,

    // Used to read length-prefixed messages
    reader: Reader,

    // Bytes we still need to send. This is a slice of `write_buf`. When
    // empty, then we're in "read-mode" and are waiting for a message from the
    // client.
    to_write: []u8,

    // Buffer for storing our lenght-prefixed messaged
    write_buf: []u8,

    // absolute time, in millisecond, when this client should timeout if
    // a message isn't received
    read_timeout: i64,

    // Node containing this client in the server's read_timeout_list
    read_timeout_node: *ClientNode,

    fn init(allocator: Allocator, socket: posix.socket_t, address: std.net.Address, loop: *KQueue) !Client {
        const reader = try Reader.init(allocator, 4096);
        errdefer reader.deinit(allocator);

        const write_buf = try allocator.alloc(u8, 4096);
        errdefer allocator.free(write_buf);

        return .{
            .loop = loop,
            .reader = reader,
            .socket = socket,
            .address = address,
            .to_write = &.{},
            .write_buf = write_buf,
            .read_timeout = 0, // let the server set this
            .read_timeout_node = undefined, // hack/ugly, let the server set this when init returns
        };
    }

    fn deinit(self: *const Client, allocator: Allocator) void {
        self.reader.deinit(allocator);
        allocator.free(self.write_buf);
    }

    fn readMessage(self: *Client) !?[]const u8 {
        return self.reader.readMessage(self.socket) catch |err| switch (err) {
            error.WouldBlock => return null,
            else => return err,
        };
    }

    fn writeMessage(self: *Client, msg: []const u8) !void {
        if (self.to_write.len > 0) {
            // Depending on how you structure your code, this might not be possible
            // For example, in an HTTP server, the application might not control
            // the actual "writeMessage" call, and thus it would not be possible
            // to make more than one writeMessage call per request.
            // For this demo, we'll just return an error.
            return error.PendingMessage;
        }

        if (msg.len + 4 > self.write_buf.len) {
            // Could allocate a dynamic buffer. Could use a large buffer pool.
            return error.MessageTooLarge;
        }

        // copy our length prefix + message to our buffer
        std.mem.writeInt(u32, self.write_buf[0..4], @intCast(msg.len), .little);
        const end = msg.len + 4;
        @memcpy(self.write_buf[4..end], msg);

        // setup our to_write slice
        self.to_write = self.write_buf[0..end];

        // immediately write what we can
        return self.write();
    }

    // Returns `false` if we didn't manage to write the whole mssage
    // Returns `true` if the message is fully written
    fn write(self: *Client) !void {
        var buf = self.to_write;
        defer self.to_write = buf;
        while (buf.len > 0) {
            const n = posix.write(self.socket, buf) catch |err| switch (err) {
                error.WouldBlock => return self.loop.writeMode(self),
                else => return err,
            };

            if (n == 0) {
                return error.Closed;
            }
            buf = buf[n..];
        } else {
            return self.loop.readMode(self);
        }
    }
};

const Reader = struct {
    buf: []u8,
    pos: usize = 0,
    start: usize = 0,

    fn init(allocator: Allocator, size: usize) !Reader {
        const buf = try allocator.alloc(u8, size);
        return .{
            .pos = 0,
            .start = 0,
            .buf = buf,
        };
    }

    fn deinit(self: *const Reader, allocator: Allocator) void {
        allocator.free(self.buf);
    }

    fn readMessage(self: *Reader, socket: posix.socket_t) ![]u8 {
        var buf = self.buf;

        while (true) {
            if (try self.bufferedMessage()) |msg| {
                return msg;
            }
            const pos = self.pos;
            const n = try posix.read(socket, buf[pos..]);
            if (n == 0) {
                return error.Closed;
            }
            self.pos = pos + n;
        }
    }

    fn bufferedMessage(self: *Reader) !?[]u8 {
        const buf = self.buf;
        const pos = self.pos;
        const start = self.start;

        std.debug.assert(pos >= start);
        const unprocessed = buf[start..pos];
        if (unprocessed.len < 4) {
            self.ensureSpace(4 - unprocessed.len) catch unreachable;
            return null;
        }

        const message_len = std.mem.readInt(u32, unprocessed[0..4], .little);

        // the length of our message + the length of our prefix
        const total_len = message_len + 4;

        if (unprocessed.len < total_len) {
            try self.ensureSpace(total_len);
            return null;
        }

        self.start += total_len;
        return unprocessed[4..total_len];
    }

    fn ensureSpace(self: *Reader, space: usize) error{BufferTooSmall}!void {
        const buf = self.buf;
        if (buf.len < space) {
            return error.BufferTooSmall;
        }

        const start = self.start;
        const spare = buf.len - start;
        if (spare >= space) {
            return;
        }

        const unprocessed = buf[start..self.pos];
        std.mem.copyForwards(u8, buf[0..unprocessed.len], unprocessed);
        self.start = 0;
        self.pos = unprocessed.len;
    }
};

// We'll eventually need to build a platform abstractions between epoll and
// kqueue. This is a rough start.
const KQueue = struct {
    kfd: posix.fd_t,
    event_list: [128]system.Kevent = undefined,
    change_list: [16]system.Kevent = undefined,
    change_count: usize = 0,

    fn init() !KQueue {
        const kfd = try posix.kqueue();
        return .{.kfd = kfd};
    }

    fn deinit(self: KQueue) void {
        posix.close(self.kfd);
    }

    fn wait(self: *KQueue, timeout_ms: i32) ![]system.Kevent {
        const timeout = posix.timespec{
            .sec = @intCast(@divTrunc(timeout_ms, 1000)),
            .nsec = @intCast(@mod(timeout_ms, 1000) * 1000000),
        };
        const count = try posix.kevent(self.kfd, self.change_list[0..self.change_count], &self.event_list, &timeout);
        self.change_count = 0;
        return self.event_list[0..count];
    }

    fn addListener(self: *KQueue, listener: posix.socket_t) !void {
        // ok to use EV.ADD to renable the listener if it was previous
        // disabled via removeListener
        try self.queueChange(.{
            .ident = @intCast(listener),
            .filter = posix.system.EVFILT.READ,
            .flags = posix.system.EV.ADD,
            .fflags = 0,
            .data = 0,
            .udata = 0,
        });
    }

    fn removeListener(self: *KQueue, listener: posix.socket_t) !void {
        try self.queueChange(.{
            .ident = @intCast(listener),
            .filter = posix.system.EVFILT.READ,
            .flags = posix.system.EV.DISABLE,
            .fflags = 0,
            .data = 0,
            .udata = 0,
        });
    }

    fn newClient(self: *KQueue, client: *Client) !void {
        try self.queueChange(.{
            .ident = @intCast(client.socket),
            .filter = posix.system.EVFILT.READ,
            .flags = posix.system.EV.ADD,
            .fflags = 0,
            .data = 0,
            .udata = @intFromPtr(client),
        });

        try self.queueChange(.{
            .ident = @intCast(client.socket),
            .filter = posix.system.EVFILT.WRITE,
            .flags = posix.system.EV.ADD | posix.system.EV.DISABLE,
            .fflags = 0,
            .data = 0,
            .udata = @intFromPtr(client),
        });
    }

    fn readMode(self: *KQueue, client: *Client) !void {
        try self.queueChange(.{
            .ident = @intCast(client.socket),
            .filter = posix.system.EVFILT.WRITE,
            .flags = posix.system.EV.DISABLE,
            .fflags = 0,
            .data = 0,
            .udata = 0,
        });

        try self.queueChange(.{
            .ident = @intCast(client.socket),
            .filter = posix.system.EVFILT.READ,
            .flags = posix.system.EV.ENABLE,
            .fflags = 0,
            .data = 0,
            .udata = @intFromPtr(client),
        });

    }

    fn writeMode(self: *KQueue, client: *Client) !void {
        try self.queueChange(.{
            .ident = @intCast(client.socket),
            .filter = posix.system.EVFILT.READ,
            .flags = posix.system.EV.DISABLE,
            .fflags = 0,
            .data = 0,
            .udata = 0,
        });

        try self.queueChange(.{
            .ident = @intCast(client.socket),
            .flags = posix.system.EV.ENABLE,
            .filter = posix.system.EVFILT.WRITE,
            .fflags = 0,
            .data = 0,
            .udata = @intFromPtr(client),
        });
    }

    fn queueChange(self: *KQueue, event: system.Kevent) !void {
        var count = self.change_count;
        if (count == self.change_list.len) {
            // our change_list batch is full, apply it
            _ = try posix.kevent(self.kfd, &self.change_list, &.{}, null);
            count = 0;
        }
        self.change_list[count] = event;
        self.change_count = count + 1;
    }
};
{% endhighlight %}

<div class=pager>
  <a class=prev href=/TCP-Server-In-Zig-Part-6-Epoll/>Epoll</a>
  <a class=next href=/TCP-Server-In-Zig-Part-8-Epoll-and-Kqueue/>Epoll & Kqueue</a>
</div>
